<?php
/**
 * 消息队列管理
 */
include_once dirname(dirname(__FILE__)).DIRECTORY_SEPARATOR.'public.class.php';
class nl_message extends nl_public
{
	static $base_table='nns_mgtvbk_message';
	static $arr_filed = array(
			'nns_id',
            'nns_message_time',
            'nns_message_id',
            'nns_message_xml',
            'nns_message_content',
	        'nns_message_original_url',
            'nns_message_url',
            'nns_bk_queue_excute_url',
            'nns_message_state',
            'nns_action',
            'nns_type',
            'nns_name',
            'nns_create_time',
            'nns_again',
            'nns_delete',
            'nns_fail_time',
            'nns_modify_time',
            'nns_cp_id',
            'nns_package_id',
            'nns_xmlurlqc',
            'nns_encrypt',
            'nns_content_number',
	);

	/**
	 * 入库message表 
	 * @param object $dc 数据库操作对象
	 * @param array $params
	 */
	static public function insert_message($dc,$params)
	{
		if(empty($params))
		{
			return self::return_data(1,'新增数据为空');
		}
		$message_data = array();
		//读取毫秒级时间
		list ( $usec, $sec ) = explode ( ' ', microtime () );
		$time = str_pad(intval(substr($usec, 2, 4)),4,'0',STR_PAD_LEFT);

		$message_data['nns_id'] = np_guid_rand();
		$message_data['nns_message_time'] = date('YmdHis', time()) . $time;
		$message_data['nns_message_id'] = $params['msgid'];
		$message_data['nns_message_xml'] = isset($params['message_xml']) ? $params['message_xml'] : '';
		$message_data['nns_message_state'] = '0';
		$message_data['nns_message_content'] = isset($params['xml']) ? $params['xml'] : '';
		$message_data['nns_create_time'] = date('Y-m-d H:i:s', time());
		$message_data['nns_again'] = 0;
		$message_data['nns_delete'] = 0;
		//操作
 		$message_data['nns_action'] = isset($params['action']) ? $params['action'] : '';
 		$message_data['nns_cp_id'] = isset($params['cp_id']) ? $params['cp_id'] : 0;
 		$message_data['nns_type'] = isset($params['type']) ? $params['type'] : '';
 		$message_data['nns_name'] = isset($params['name']) ? $params['name'] : '';
 		$message_data['nns_package_id'] = isset($params['package_id']) ? $params['package_id'] : '';
		$field_str = '';
    	$value_str = '';
    	foreach($message_data as $k=>$v)
    	{
    		$field_str .= $k.',';
    		$value_str .= "'$v',";
    	}
    	$field_str = rtrim($field_str,',');
    	$value_str = rtrim($value_str,',');

    	$sql = "REPLACE into " . self::$base_table . " ($field_str) values($value_str)";
        $result = nl_execute_by_db($sql,$dc->db());
        if($result)
        {
        	return self::return_data(0,'ok');
        }
		else 
		{
			return self::return_data(1,'failed');
		}
	}
	/**
	 * 添加消息队列
	 * @param object $dc 数据库对象
	 * @param array $params 数据数组
	 * @return Ambigous <array('ret'=>'状态码','reason'=>'原因','data'=>'数据')
	 * @author liangpan
	 * @date 2016-03-06
	 */
	static public function add($dc,$params,$is_insert = '0')
	{
	    if(!isset($params['nns_id']) || strlen($params['nns_id'])<1)
		{
			return self::return_data(1,'guid为空');
		}
		$params = self::except_useless_params(self::$arr_filed, $params);
		if(empty($params))
		{
			return self::return_data(1,'参数为空');
		}
		$params['nns_create_time'] = $params['nns_modify_time'] = date('Y-m-d H:i:s');
		$str_guid = $params['nns_id'];
		if($is_insert !='1' && strlen($params['nns_message_id'])>0 && strlen($params['nns_cp_id'])>0)
		{
		    //先查询是否消息ID 是否存在相同的
		    $check_re = self::get_message_info($dc, array("nns_message_id" => $params['nns_message_id'],"nns_cp_id" => $params['nns_cp_id']));
		    if($check_re['ret'] != 0)
		    {
		        return self::return_data(1,'查询消息ID是否存在时，数据库执行失败');
		    }
		    if(!isset($check_re['data_info'][0]) || !is_array($check_re['data_info'][0]) || empty($check_re['data_info'][0]))
		    {
		        $sql = self::make_insert_sql(self::$base_table, $params);
		    }
		    else
		    {
		        $str_guid = $check_re['data_info'][0]['nns_id'];
    		    if($is_insert == '2')
                {
                    return self::return_data(2,'ok'.var_export(array("nns_message_id" => $params['nns_message_id'],"nns_cp_id" => $params['nns_cp_id']),true),$str_guid);
                }
    	        $sql = self::make_update_sql(self::$base_table, $params,array("nns_message_id" => $params['nns_message_id'],"nns_cp_id" => $params['nns_cp_id'],'nns_modify_time'=>date("Y-m-d H:i:s")));
		    }
		}
		else
		{
		    $sql = self::make_insert_sql(self::$base_table, $params);
		}
		$result = nl_execute_by_db($sql, $dc->db());
		if(!$result)
		{
			return self::return_data(1,'数据库执行失败'.$sql);
		}
		return self::return_data(0,'ok'.$sql,$str_guid);
	}
	
	/**
	 * 消息队列逻辑删除
	 * @param object $dc 数据库对象
	 * @param string $date 日期
	 * @param string $cp_id cpid
	 * @param int $fail_message_time
	 * @author liangpan
	 * @date 2016-03-06
	 */
	static public function logic_delete($dc,$date,$cp_id,$fail_message_time)
	{
		if(strlen($cp_id) <1)
		{
			return self::return_data(0,'ok');
		}
		$sql_update_delete = "update nns_mgtvbk_message set nns_delete='1',nns_modify_time='{$date}' where nns_delete='0' and nns_cp_id='{$cp_id}' and nns_again >= '{$fail_message_time}'";
		$result = nl_execute_by_db($sql_update_delete,$dc->db());
		if(!$result)
		{
			return self::return_data(1,'消息队列逻辑删除数据库执行失败'.$sql_update_delete);
		}
		return self::return_data(0,'ok');
	}
	
	/**
	 * 定时器查询消息队列数据
	 * @param object $dc 数据库对象
	 * @param string $cp_id cpid
	 * @author liangpan
	 * @date 2016-03-06
	 */
	static public function query_timer_message_info($dc,$cp_id=null)
	{
		$sql = "select * from nns_mgtvbk_message where nns_message_state=0 and nns_cp_id='{$cp_id}' ";
		if(strlen($cp_id) >0)
		{
			$sql.=" and nns_cp_id='{$cp_id}' ";
		}
		$sql.=" order by nns_message_time asc limit 100 ";
		$result = nl_query_by_db($sql,$dc->db());
		if(!$result)
		{
			return self::return_data(1,'消息队列逻辑删除数据库执行失败'.$sql);
		}
		return self::return_data(0,'ok',$result);
	}
	

	/**
	 * 定时器查询消息队列数据
	 * @param object $dc 数据库对象
	 * @param string $cp_id  cpid
	 * @param number $first_import_num  消息队列获取第一次消息注入的条数
	 * @param string $fail_import_enable 失败消息注入开关 false 关闭  true开启
	 * @param number $fail_import_num 失败消息获取条数
	 * @param string $child_import_enable  消息队列子级注入开关 false 关闭  true开启
	 * @param number $child_import_num 消息队列子级注入条数
	 * @param number $fail_message_time 失败次数
	 * @return Ambigous <array('ret'=>'状态码','reason'=>'原因','data'=>'数据'), multitype:NULL number string , number, mixed, multitype:NULL multitype:a r y s t i n g  array multitype:number  >
	 * @author liangpan
	 * @date 2016-03-06
	 */
	static public function query_timer_message_info_v2($dc,$cp_id=null,$first_import_num=50,$fail_import_enable=false,$fail_import_num=30,$child_import_enable=false,$child_import_num=30,$fail_message_time=50,$params=array())
	{
		$str_where_cp='';
		$str_where = '';
		if(strlen($cp_id) >0)
		{
			$str_where_cp=" and nns_cp_id='{$cp_id}' ";
		}
		if(!empty($params) && is_array($params))
		{
			foreach ($params as $key=>$fild)
			{
				if(isset($fild['in']) && is_array($fild['in']))
				{
					$str_fild = implode("','", $fild['in']);
					$str_where .= " and {$key} in ('{$str_fild}') ";
					continue;
				}
				$str_where .= " and {$key}='{$fild}' ";
			}
		}
		$result_fail = $result_child = true;
		$sql_first = "select * from nns_mgtvbk_message where nns_message_state=0 {$str_where_cp} {$str_where} order by nns_message_time asc limit {$first_import_num} ";
		$result_first = nl_query_by_db($sql_first,$dc->db());
		if($fail_import_enable)
		{
			$sql_fail = "select * from nns_mgtvbk_message where nns_message_state=4 and nns_delete='0' {$str_where_cp} {$str_where} and nns_again <'{$fail_message_time}' order by nns_fail_time asc limit {$fail_import_num} ";
			$result_fail = nl_query_by_db($sql_fail,$dc->db());
		}
		if($child_import_enable)
		{
			$sql_child = "select * from nns_mgtvbk_message where nns_message_state in(6,7) and nns_delete='0' {$str_where_cp} {$str_where} order by nns_fail_time asc limit {$child_import_num} ";
			$result_child = nl_query_by_db($sql_child,$dc->db());
		}
		if(!$result_first)
		{
			return self::return_data(1,'消息队列数据库查询执行失败'.$sql_first);
		}
		if(!$result_fail)
		{
			return self::return_data(1,'消息队列数据库查询执行失败'.$sql_fail);
		}
		if(!$result_child)
		{
			return self::return_data(1,'消息队列数据库查询执行失败'.$sql_child);
		}
		$result_data = array();
		if(is_array($result_first) && !empty($result_first))
		{
			$result_data = $result_first;
			unset($result_first);
		}
		if(is_array($result_fail) && !empty($result_fail))
		{
			$result_data = (is_array($result_data) && !empty($result_data)) ? array_merge($result_data,$result_fail) : $result_fail;
			unset($result_fail);
		}
		if(is_array($result_child) && !empty($result_child))
		{
			$result_data = (is_array($result_data) && !empty($result_data)) ? array_merge($result_data,$result_child) : $result_child;
			unset($result_child);
		}
		return self::return_data(0,'ok',$result_data);
		
// 		$sql = "(select * from nns_mgtvbk_message where nns_message_state=0 {$str_where_cp} order by nns_message_time asc limit 100) union " .
// 		" (select * from nns_mgtvbk_message where nns_message_state=4 and nns_delete='0'  {$str_where_cp} and nns_again <'{$g_message_time}' order by nns_fail_time asc limit 50) " .
// 		" union (select * from nns_mgtvbk_message where nns_message_state in(6,7)  {$str_where_cp} order by nns_fail_time asc limit 50) ";
// 		$result = nl_query_by_db($sql,$dc->db());
// 		if(!$result)
// 		{
// 			return self::return_data(1,'消息队列逻辑删除数据库执行失败'.$sql);
// 		}
// 		return self::return_data(0,'ok',$result);
	}
	
	/**
	 * 消息队列更改状态
	 * @param object $dc 数据库对象
	 * @param string $id guid
	 * @param array $update_fields 参数数组
	 * @author liangpan
	 * @date 2016-03-06
	 */
	static public function update_message_state($dc,$id, $update_fields,$flag=false)
	{
		//需要更新的字段不为空的时候它的类型必须是个数组
		if (empty($id) || (count($update_fields) > 0 && !is_array($update_fields)))
		{
			return self::return_data(0,'ok');
		}
		$sql = 'update nns_mgtvbk_message set ';
		if (!isset($update_fields['nns_modify_time']) || empty($update_fields['nns_modify_time']))
        {
            $sql .= 'nns_modify_time ="' . date('Y-m-d H:i:s') . '",';
        }
		foreach ($update_fields as $field => $value)
		{
			$sql .= "$field='$value',";
		}

		if($flag)
		{
			$sql .= "nns_again=nns_again+1,";
		}
		$sql = rtrim($sql, ',');
		$sql .= " where nns_id='{$id}'";
		$result = nl_execute_by_db($sql, $dc->db());
		if(!$result)
		{
			return self::return_data(1,'消息队列更改状态数据库执行失败'.$sql);
		}
		return self::return_data(0,'ok'.$sql,$result);
	}

	/**
	 * 更新nn_mgtvbk_message表中数据
	 *
	 * @param  $dc
	 * @param  array  $where  条件数组  说明：array('条件字段'=>'值')
	 * @param  array  $update_fields  更新数组  说明：array('字段'=>'值')
	 * @return  array
	 * @author <feijian.gao@starcor.cn>
	 * @date  2017年3月16日21:15:05
	 */
	static public function update_message($dc,$where, $update_fields)
	{
		$where_sql = '';
		$sql = 'update nns_mgtvbk_message set ';
		foreach ($update_fields as $field => $value)
		{
			$sql .= "$field='$value',";
		}
		$sql = rtrim($sql, ",");
		if (is_array($where) && count($where) > 0)
		{
			foreach ($where as $key => $value)
			{
				if (strlen($value) > 0)
				{
					$where_sql .= " {$key}='{$value}' and ";
				}
			}
		}
		if(!empty($where_sql))
		{
			$sql .= 'where ';
			$where_sql = ltrim($where_sql,'');
			$where_sql = rtrim($where_sql,' and ');
			$sql .= $where_sql;
		}

		$result = nl_execute_by_db($sql, $dc->db());
		if(!$result)
		{
			return self::return_data(1,'消息队列更改状态数据库执行失败'.$sql);
		}
		return self::return_data(0,'ok'.$sql,$result);
	}
	/**
	 * 根据条件查询消息
	 * @param $dc DC
	 * @param $params 条件数组
	 * @return array
	 */	
	static public function get_message_info($dc,$params = array())
	{
		$str_sql = "select * from nns_mgtvbk_message ";
		$where = '';
		if (is_array($params) && count($params) > 0)
		{
			foreach ($params as $key => $value)
			{
				if (strlen($value) > 0)
				{
					$where .= " {$key}='{$value}' and ";
				}
			}
		}
		if(!empty($where))
		{
			$str_sql .= 'where ';
			$where = ltrim($where,'');
			$where = rtrim($where,' and ');
			$str_sql .= $where;
		}
		$data = nl_query_by_db($str_sql, $dc->db());
		if($data === false)
		{
			return self::return_data(1,'数据库执行失败'.$str_sql);
		}
		if($data === true)
		{
			$data = array();
		}
		return self::return_data(0,'ok',$data);	
	}
	
	/**
	 * 消息查询
	 * @param unknown $dc
	 * @param unknown $message_id
	 * @param unknown $cp_id
	 */
	static public function query_message_by_message_id($dc,$message_id,$cp_id)
	{
	    $sql="select * from " . self::$base_table . " where nns_message_id='{$message_id}' and nns_cp_id='{$cp_id}' limit 1" ;
	    $result = nl_query_by_db($sql, $dc->db());
	    if(!$result)
	    {
	        return self::return_data(1,'消息队列查询失败'.$sql);
	    }
	    $result = (isset($result[0]) && !empty($result[0]) && is_array($result[0])) ? $result[0] : null;
	    return self::return_data(0,'ok'.$sql,$result);
	}
	
	
	/**
	 * 消息查询
	 * @param unknown $dc
	 * @param unknown $message_id
	 * @param unknown $cp_id
	 */
	static public function query_message_by_message_name($dc,$message_name,$cp_id)
	{
	    $sql="select * from " . self::$base_table . " where nns_name='{$message_name}' and nns_cp_id='{$cp_id}' limit 1" ;
	    $result = nl_query_by_db($sql, $dc->db());
	    if(!$result)
	    {
	        return self::return_data(1,'消息队列查询失败'.$sql);
	    }
	    $result = (isset($result[0]) && !empty($result[0]) && is_array($result[0])) ? $result[0] : null;
	    return self::return_data(0,'ok'.$sql,$result);
	}
	
	/**
	 * 查询单个列表
	 * @param object $dc 数据库对象
	 * @param string $nns_id guid
	 * @return Ambigous array('ret'=>'状态码','reason'=>'原因','data'=>'数据')
	 * @author liangpan
	 * @date 2016-03-06
	 */
	static public function query_by_id($dc,$nns_id)
	{
	    $sql = "select * from " . self::$base_table . " where nns_id='{$nns_id}' limit 1";
	    $result = nl_query_by_db($sql, $dc->db());
	    if(!$result)
	    {
	        return self::return_data(1,'数据库查询失败,sql:'.$sql);
	    }
	    $result = (isset($result[0]) && !empty($result[0])) ? $result[0] : null;
	    return self::return_data(0,'OK',$result);
	}
	
	
	/**
	 * 依据条件查询主媒资信息
	 * @param unknown $dc
	 * @param unknown $params
	 */
	static public function query_by_condition($dc,$params)
	{
	    $params = self::except_useless_params(self::$arr_filed, $params);
	    if(!is_array($params) || empty($params))
	    {
	        return self::return_data(1,'查询条件为空不允许查询');
	    }
	    $sql = self::make_query_sql(self::$base_table,$params);
	    $result = nl_query_by_db($sql, $dc->db());
	    if(!$result)
	    {
	        return self::return_data(1,'数据库查询失败,sql:'.$sql);
	    }
	    $result = (isset($result) && !empty($result)) ? $result : null;
	    return self::return_data(0,'OK'.$sql,$result);
	}
}